/**
 * 
 */
package com.ihome.framework.core.event;

import java.io.Serializable;
import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.annotation.Resource;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Scope;
import org.springframework.core.NamedThreadLocal;
import org.springframework.util.Assert;

import com.alibaba.dubbo.common.utils.NamedThreadFactory;
import com.ihome.framework.core.cache.redis.IHomeRedisClient;
import com.ihome.framework.core.event.IHomeEventBusHandler.EventBusWorkHandler;
import com.ihome.framework.core.event.IHomeEventBusHandler.EventDto;
import com.ihome.framework.core.event.command.CommandDispatch;
import com.ihome.framework.core.event.command.buffer.CommandBuffer.CommandCollector;
import com.ihome.framework.core.event.command.buffer.CommandBuffer.CommandDto;
import com.ihome.framework.core.event.command.buffer.CommandBuffer.SqlCommandDto;
import com.ihome.framework.core.event.command.buffer.mybatis.DaoMapperProxy;
import com.ihome.framework.core.event.command.handler.SimpleCommandEventHandler;
import com.ihome.framework.core.event.group.EventBusHandlerGroup;
import com.ihome.framework.core.event.group.IHomeEventBusGroupRegister;
import com.ihome.framework.core.id.service.IdGeneratorService.UUIDWorker;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;




/**
 * @author zww
 *
 */
public  abstract class IHomeEventBusHandler<E extends EventDto> implements ApplicationContextAware,ExceptionHandler<E>,InitializingBean,DisposableBean,EventFactory<E>,IHomeEventProducer<E>,IHomeEventConsumer<E>{

    private RingBuffer<E> ringBuffer = null; // Event bus Buffers
    private final int bufferSize; // buffer size
    public  final static int DEFAULT_BUFFER_SIZE = 128;
    private final WaitStrategy waitStrategy;
    private final ProducerType producerType; // SINGLE or MULTI.  MULTI CAS Sequence
    private Disruptor<E> INSTANCE;
    private final Class<?> EVENT_CLASS;
    private final static int slowTime = 30; // handler exec long time /ms


    private transient ApplicationContext applicationContext;


    private AtomicBoolean isStart = new AtomicBoolean(false);

    protected transient Logger LOGGER = LoggerFactory.getLogger(getClass());


    private ThreadFactory threadFactory = new NamedThreadFactory(); // start factory

    @SuppressWarnings("static-access")
    public IHomeEventBusHandler(int bufferSize,ProducerType producerType,WaitStrategy waitStrategy){
        if (bufferSize < 1)
        {
            throw new IllegalArgumentException("bufferSize must not be less than 1");
        }
        if (Integer.bitCount(bufferSize) != 1)
        {
            throw new IllegalArgumentException("bufferSize must be a power of 2");
        }
        Assert.notNull(waitStrategy,"wait strategy must null");
        this.bufferSize = bufferSize;
        this.producerType = producerType;
        this.EVENT_CLASS = newInstance().getClass();
        this.waitStrategy = waitStrategy;
    }

    public IHomeEventBusHandler(int bufferSize,ProducerType producerType){
        this(bufferSize, producerType, new YieldingWaitStrategy());
    }

    public IHomeEventBusHandler(int bufferSize){
        this(bufferSize,ProducerType.SINGLE);
    }

    public IHomeEventBusHandler(){
        this(DEFAULT_BUFFER_SIZE);
    }

    /* (non-Javadoc)
     * @see org.springframework.context.ApplicationContextAware#setApplicationContext(org.springframework.context.ApplicationContext)
     */
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }


    @Scope("prototype")
    public static abstract class EventBusWorkDBHandler<E extends EventDto> extends EventBusWorkHandler<E> implements EventHandler<E>,WorkHandler<E>,InitializingBean,DisposableBean{

        @Resource
        protected DaoMapperProxy mapperProxy;

        protected <T extends SqlCommandDto> Void invoke(Object proxy,String methodName,Class<T> clz,Object... args) throws Exception{
            mapperProxy.invoke(proxy, methodName, clz, args);
            return null;
        }

    }



    @Scope("prototype")
    public static abstract class EventBusWorkHandler<E extends EventDto> implements EventHandler<E>,WorkHandler<E>,InitializingBean,DisposableBean{

        protected transient Logger LOGGER = LoggerFactory.getLogger(getClass());

        private CommandDispatch dispatch;

        protected IHomeRedisClient redisClient;

        public static final ThreadLocal<EventDto> LOCAL_EVENT = new NamedThreadLocal<EventDto>("local events");



        public  E getLocalEvent(){
            return (E)LOCAL_EVENT.get();
        }

        /* (non-Javadoc)
         * @see com.lmax.disruptor.EventHandler#onEvent(java.lang.Object, long, boolean)
         */
        @Override
        public  void onEvent(E event, long sequence, boolean endOfBatch) throws Exception {

            long start = System.currentTimeMillis();
            try{ 
                if(!event.hasErrorOrException()){
                    try{
                        LOCAL_EVENT.set(event);
                        handler(event, sequence,endOfBatch);
                    }finally {
                        LOCAL_EVENT.remove();
                    }
                }else{
                    LOGGER.debug("pass handler requestId:{}",event.getRequestId());
                }
            }finally {
                long end = System.currentTimeMillis();
                long exec = end - start;
                if(!(this instanceof SimpleCommandEventHandler) &&  exec > slowTime){
                    LOGGER.warn("requestId:{}-handler {} execution time is too long... time:{}ms",event.getRequestId(),getClass(),exec);
                }

            }


        }

        /* (non-Javadoc)
         * @see com.lmax.disruptor.WorkHandler#onEvent(java.lang.Object)
         */
        @Override
        public  void onEvent(E event) throws Exception{
            onEvent(event,-1,false);
        }





        public abstract void handler(E event,long sequence,boolean endOfBatch) throws Exception;


        public void flush() throws Exception{
            E event = (E) LOCAL_EVENT.get();
            try{
                Assert.notNull(event,"thread local not event! check code...");
                List<CommandDto> commands = event.getCommandCollector().getCommandList();
                if(commands != null){
                    for (CommandDto commandDto : commands) {
                        dispatch.dispatch(commandDto);
                    }
                }
            }finally {
                if(event != null)
                    event.clearForGc(); // clear command collector exec gc
            }

        }

        /* (non-Javadoc)
         * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
         */
        @Override
        public void afterPropertiesSet() throws Exception {
            // TODO Auto-generated method stub

        }

        /* (non-Javadoc)
         * @see org.springframework.beans.factory.DisposableBean#destroy()
         */
        @Override
        public void destroy() throws Exception {
            // TODO Auto-generated method stub

        }

        /**
         * @param dispatch the dispatch to set
         */
        @Autowired
        public void setDispatch(CommandDispatch dispatch) {
            this.dispatch = dispatch;
        }
        /**
         * @param redisClient the redisClient to set
         */
        @Autowired
        public void setRedisClient(IHomeRedisClient redisClient) {
            this.redisClient = redisClient;
        }


    }




    public static class EventDto implements Serializable{

        private static final long serialVersionUID = 1L;

        private final CommandCollector commandCollector = new CommandCollector();

        private final String requestId;

        protected String errorMessage;

        protected boolean success = true;

        private final Map<String, Object> paramsHashMap = new HashMap<String, Object>();

        public EventDto(){
            this(UUIDWorker.getNextId());
        }

        public EventDto(String requestId){
            this.requestId = requestId;
        }

        public void setParam(String key,Object param){
            paramsHashMap.put(key, param);
        }

        public <T> T getParam(String key){
            return (T) paramsHashMap.get(key);
        }

        public void addCommand(CommandDto command){
            Assert.notNull(command,"args command is null");
            commandCollector.addCommand(command);
        }

        /**
         * @return the requestId
         */
        public String getRequestId() {
            return requestId;
        }

        /**
         * @param success the success to set
         */
        public void setSuccess(boolean success) {
            this.success = success;
        }

        /**
         * @param errorMessage the errorMessage to set
         */
        public void setErrorMessage(String errorMessage) {
            this.errorMessage = errorMessage;
        }

        /**
         * @return the commandCollector
         */
        public CommandCollector getCommandCollector() {
            return commandCollector;
        }
        /**
         * @return the errorMessage
         */
        public String getErrorMessage() {
            return errorMessage;
        }

        public boolean hasErrorOrException() {
            return StringUtils.isNotBlank(errorMessage) ||  !success;
        }

        public void clearForGc(){
            commandCollector.clear();
            paramsHashMap.clear();
        }


        @Override
        public String toString() {
            return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
        }

        @Override
        public int hashCode() {
            return HashCodeBuilder.reflectionHashCode(this);
        }
    }

    /* (non-Javadoc)
     * @see com.lmax.disruptor.EventFactory#newInstance()
     */
    @Override
    public E newInstance() {
        Class<E> clz = null;
        if(EVENT_CLASS !=null){
            clz = (Class<E>) EVENT_CLASS;
        }else{
            try{ clz = (Class<E>) getSuperClassActualTypeArguments(getClass(), 0);
            } catch (Exception e) {}
        }
        Assert.notNull(clz,"not find event type class.");
        E e = null;
        try {  e = clz.newInstance();
        } catch (Exception ex) {}
        Assert.notNull(e,"new instance error! not class empty args construction method");
        return e;
    }

    @Override
    public void publishEvent(IHomeEventTranslator<E> translator){

        Assert.isTrue(isStart.get(),"event bus service<"+EVENT_CLASS+"> not start.");
        long sequence = ringBuffer.next();  // Grab the next sequence
        try{
            E event = ringBuffer.get(sequence); // Get the entry in the Disruptor
            translator.translateTo(event);
        }finally {
            ringBuffer.publish(sequence); // finally send out sequence
        }

    }




    /* (non-Javadoc)
     * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
     */
    @Override
    public void afterPropertiesSet() throws Exception {
        EventFactory<E> factory = this;
        INSTANCE = new Disruptor<>(factory, bufferSize, threadFactory,producerType,this.waitStrategy);
        INSTANCE.setDefaultExceptionHandler(this);
        buildEventsWith(this,applicationContext);
        build();
    }

    /* (non-Javadoc)
     * @see com.lmax.disruptor.ExceptionHandler#handleEventException(java.lang.Throwable, long, java.lang.Object)
     */
    @Override
    public void handleEventException(Throwable ex, long sequence, E event) {
        event.setErrorMessage(ex.getMessage());
        event.setSuccess(Boolean.FALSE);
        LOGGER.error(String.format("requestId:%s|event:%s|message:%s", event.getRequestId(),event,ex.getMessage()), ex);
        event.clearForGc();
    }
    /* (non-Javadoc)
     * @see com.lmax.disruptor.ExceptionHandler#handleOnShutdownException(java.lang.Throwable)
     */
    @Override
    public void handleOnShutdownException(Throwable ex) {
        LOGGER.error(ex.getMessage(),ex);
    }
    /* (non-Javadoc)
     * @see com.lmax.disruptor.ExceptionHandler#handleOnStartException(java.lang.Throwable)
     */
    @Override
    public void handleOnStartException(Throwable ex) {
        LOGGER.error(ex.getMessage(),ex);
    }


    /* (non-Javadoc)
     * @see com.ihome.framework.core.event.IHomeEventConsumer#build()
     */
    @Override
    public void build() {
        if(isStart.compareAndSet(false, true)){
            INSTANCE.start();
            ringBuffer = INSTANCE.getRingBuffer();
        }
    }

    /* (non-Javadoc)
     * @see org.springframework.beans.factory.DisposableBean#destroy()
     */
    @Override
    public void destroy() throws Exception {
        if(isStart.compareAndSet(true, false)){
            LOGGER.info("Service -> Event[{}] call shutdown()",EVENT_CLASS);
            INSTANCE.shutdown(10, TimeUnit.SECONDS);
        }
    }



    /* (non-Javadoc)
     * @see com.ihome.framework.core.event.IHomeEventConsumer#getHandlerGroup()
     */
    @Override
    public Disruptor<E> getDisruptor() {
        Assert.isTrue(isStart.get(),"event handler group service not start...");
        return INSTANCE;
    }

    /* (non-Javadoc)
     * @see com.ihome.framework.core.event.IHomeEventConsumer#handleEventsWith(com.ihome.framework.core.event.IHomeEventBusHandler.EventBusHandler)
     */
    @Override
    public IHomeEventBusGroupRegister<E> handleEventsWith(EventBusWorkHandler<E>... handlers) {
        return new EventBusHandlerGroup<E>(INSTANCE.handleEventsWith(handlers),applicationContext);
    }

    /* (non-Javadoc)
     * @see com.ihome.framework.core.event.IHomeEventConsumer#handleEventsWithPool(com.ihome.framework.core.event.IHomeEventBusHandler.EventBusWorkHandler[])
     */
    @Override
    public IHomeEventBusGroupRegister<E> handleEventsWithPool(EventBusWorkHandler<E>... handlers) {
        return new EventBusHandlerGroup<E>(INSTANCE.handleEventsWithWorkerPool(handlers),applicationContext);
    }

    @Override
    public IHomeEventBusGroupRegister<E> handleEventsWithPool(Class<? extends EventBusWorkHandler<E>> clz,Integer consumerSize) {
        List<EventBusWorkHandler<E>> handlers = new ArrayList<EventBusWorkHandler<E>>();
        for (int i = 0; i < consumerSize; i++) {
            EventBusWorkHandler<E> handler = applicationContext.getBean(clz);
            if(i != 0){
                // check
                Assert.isTrue(handlers.get(i-1) != handler,"work handler with consumer @Scope(value='prototype')");
            }
            handlers.add(handler);
        }
        return handleEventsWithPool(handlers.toArray(new EventBusWorkHandler[]{}));

    }


    /* (non-Javadoc)
     * @see com.ihome.framework.core.event.IHomeEventConsumer#afterHandleEventsWith(com.ihome.framework.core.event.IHomeEventBusHandler.EventBusHandler[], com.ihome.framework.core.event.IHomeEventBusHandler.EventBusHandler[])
     */
    @Override
    public IHomeEventConsumer<E> afterHandleEventsWith(EventBusWorkHandler<E>[] afterHandlers, EventBusWorkHandler<E>... handlers) {
        INSTANCE.after(afterHandlers).handleEventsWith(handlers);
        return this;
    }

    /* (non-Javadoc)
     * @see com.ihome.framework.core.event.IHomeEventConsumer#afterHandleEventsWithPool(com.ihome.framework.core.event.IHomeEventBusHandler.EventBusHandler[], com.ihome.framework.core.event.IHomeEventBusHandler.EventBusWorkHandler[])
     */
    @Override
    public IHomeEventConsumer<E> afterHandleEventsWithPool(EventBusWorkHandler<E>[] afterHandlers, EventBusWorkHandler<E>... handlers) {
        INSTANCE.after(afterHandlers).handleEventsWithWorkerPool(handlers);
        return this;
    }


    public static Class<?> getSuperClassActualTypeArguments(Class clz,Integer index) throws Exception{
        Assert.notNull(index);
        Assert.notNull(clz);
        ParameterizedType pt = (ParameterizedType) clz.getGenericSuperclass();
        return (Class<?>) pt.getActualTypeArguments()[index];
    }


    public static Class<?> getSuperInterfacesActualTypeArguments(Class clz,Integer interIndex,Integer index) throws Exception{
        Assert.notNull(index);
        Assert.notNull(clz);
        ParameterizedType pt = (ParameterizedType) clz.getGenericInterfaces()[interIndex];
        return (Class<?>) pt.getActualTypeArguments()[index];
    }





}
